// SPDX-License-Identifier: Apache-2.0

package schemalog

import (
	"slices"

	"github.com/xataio/pgstream/internal/json"
)

type Schema struct {
	Tables []Table `json:"tables"`
	// Dropped will be true if the schema has been deleted
	Dropped bool `json:"dropped,omitempty"`
}

type Table struct {
	Oid               string       `json:"oid"`
	Name              string       `json:"name"`
	Columns           []Column     `json:"columns"`
	PrimaryKeyColumns []string     `json:"primary_key_columns"`
	Indexes           []Index      `json:"indexes"`
	Constraints       []Constraint `json:"constraints"`
	ForeignKeys       []ForeignKey `json:"foreign_keys"`
	// PgstreamID is a unique identifier of the table generated by pgstream
	PgstreamID string `json:"pgstream_id"`
}

type Column struct {
	Name          string  `json:"name"`
	DataType      string  `json:"type"`
	DefaultValue  *string `json:"default,omitempty"`
	Nullable      bool    `json:"nullable"`
	Generated     bool    `json:"generated"`
	GeneratedKind string  `json:"generated_kind,omitempty"`
	Identity      string  `json:"identity,omitempty"`
	Unique        bool    `json:"unique"`
	// Metadata is NOT typed here because we don't fully control the content that is sent from the publisher.
	Metadata   *string `json:"metadata"`
	PgstreamID string  `json:"pgstream_id"`
}

type Index struct {
	Name       string   `json:"name"`
	Columns    []string `json:"columns"`
	Unique     bool     `json:"unique"`
	Definition string   `json:"definition"`
}

type Constraint struct {
	Name       string `json:"name"`
	Type       string `json:"type"`
	Definition string `json:"definition"`
}

type ForeignKey struct {
	Name       string `json:"name"`
	Definition string `json:"definition"`
}

func (s *Schema) MarshalJSON() ([]byte, error) {
	if s == nil {
		return nil, nil
	}
	type schemaAlias Schema
	schemaJSON, err := json.Marshal(schemaAlias(*s))
	if err != nil {
		return nil, err
	}
	return json.Marshal(string(schemaJSON))
}

func (s *Schema) IsEqual(other *Schema) bool {
	switch {
	case s == nil && other == nil:
		return true
	case s != nil && other == nil, s == nil && other != nil:
		return false
	default:
		if len(s.Tables) != len(other.Tables) {
			return false
		}

		for i := range s.Tables {
			if !s.Tables[i].IsEqual(&other.Tables[i]) {
				return false
			}
		}

		return true
	}
}

func (s *Schema) getTableByName(tableName string) (Table, bool) {
	for _, t := range s.Tables {
		if t.Name == tableName {
			return t, true
		}
	}
	return Table{}, false
}

func (s *Schema) TableNames() []string {
	names := make([]string, 0, len(s.Tables))
	for _, t := range s.Tables {
		names = append(names, t.Name)
	}
	return names
}

func (t *Table) IsEqual(other *Table) bool {
	switch {
	case t == nil && other == nil:
		return true
	case t != nil && other == nil, t == nil && other != nil:
		return false
	default:
		if len(t.Columns) != len(other.Columns) {
			return false
		}

		if len(t.Indexes) != len(other.Indexes) {
			return false
		}

		if len(t.Constraints) != len(other.Constraints) {
			return false
		}

		if len(t.ForeignKeys) != len(other.ForeignKeys) {
			return false
		}

		if t.Oid != other.Oid || t.PgstreamID != other.PgstreamID || t.Name != other.Name {
			return false
		}

		return unorderedColumnsEqual(t.Columns, other.Columns) &&
			unorderedIndexesEqual(t.Indexes, other.Indexes) &&
			unorderedConstraintsEqual(t.Constraints, other.Constraints) &&
			unorderedForeignKeysEqual(t.ForeignKeys, other.ForeignKeys)
	}
}

// GetColumnByName returns the table column for the name on input and a boolean
// to indicate if it was found.
func (t *Table) GetColumnByName(name string) (Column, bool) {
	for _, c := range t.Columns {
		if c.Name == name {
			return c, true
		}
	}
	return Column{}, false
}

// GetFirstUniqueNotNullColumn will return the first unique not null column in
// the table. It will sort the columns by pgstream ID, and return the first one
// matching the not null/unique constraints. It uses the pgstream id instead of
// the name since the id doesn't change.
func (t *Table) GetFirstUniqueNotNullColumn() *Column {
	colMap := make(map[string]*Column, len(t.Columns))
	colIDs := make([]string, 0, len(t.Columns))
	for i, c := range t.Columns {
		if c.Unique && !c.Nullable {
			colIDs = append(colIDs, c.PgstreamID)
			colMap[c.PgstreamID] = &t.Columns[i]
		}
	}

	if len(colIDs) == 0 {
		return nil
	}

	slices.Sort(colIDs)
	return colMap[colIDs[0]]
}

func (c *Column) IsEqual(other *Column) bool {
	switch {
	case c == nil && other == nil:
		return true
	case c == nil && other != nil, c != nil && other == nil:
		return false
	default:
		return c.Name == other.Name &&
			c.DataType == other.DataType &&
			c.Nullable == other.Nullable &&
			c.PgstreamID == other.PgstreamID &&
			c.DefaultValue == other.DefaultValue &&
			c.Unique == other.Unique &&
			c.Generated == other.Generated &&
			c.GeneratedKind == other.GeneratedKind &&
			c.Identity == other.Identity &&
			c.Metadata == other.Metadata
	}
}

func (c *Column) IsGenerated() bool {
	return c.Generated || c.Identity != ""
}

func unorderedColumnsEqual(a, b []Column) bool {
	if len(a) != len(b) {
		return false
	}

	for _, colA := range a {
		var found bool

		for i := range b {
			if colA.IsEqual(&b[i]) {
				found = true
				break
			}
		}

		if !found {
			return false
		}
	}

	return true
}

func unorderedIndexesEqual(a, b []Index) bool {
	if len(a) != len(b) {
		return false
	}

	for _, idxA := range a {
		var found bool

		for i := range b {
			if idxA.IsEqual(&b[i]) {
				found = true
				break
			}
		}

		if !found {
			return false
		}
	}

	return true
}

func (i *Index) IsEqual(other *Index) bool {
	switch {
	case i == nil && other == nil:
		return true
	case i == nil && other != nil, i != nil && other == nil:
		return false
	default:
		return i.Name == other.Name &&
			i.Unique == other.Unique &&
			slices.Equal(i.Columns, other.Columns) &&
			i.Definition == other.Definition
	}
}

func unorderedConstraintsEqual(a, b []Constraint) bool {
	if len(a) != len(b) {
		return false
	}

	for _, constraintA := range a {
		var found bool

		for i := range b {
			if constraintA.IsEqual(&b[i]) {
				found = true
				break
			}
		}

		if !found {
			return false
		}
	}

	return true
}

func unorderedForeignKeysEqual(a, b []ForeignKey) bool {
	if len(a) != len(b) {
		return false
	}

	for _, fkA := range a {
		var found bool

		for i := range b {
			if fkA.IsEqual(&b[i]) {
				found = true
				break
			}
		}

		if !found {
			return false
		}
	}

	return true
}

func (c *Constraint) IsEqual(other *Constraint) bool {
	switch {
	case c == nil && other == nil:
		return true
	case c == nil && other != nil, c != nil && other == nil:
		return false
	default:
		return c.Name == other.Name &&
			c.Type == other.Type &&
			c.Definition == other.Definition
	}
}

func (fk *ForeignKey) IsEqual(other *ForeignKey) bool {
	switch {
	case fk == nil && other == nil:
		return true
	case fk == nil && other != nil, fk != nil && other == nil:
		return false
	default:
		return fk.Name == other.Name &&
			fk.Definition == other.Definition
	}
}
